{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 04: Matrix - An Exercise in Parallelism\n",
    "\n",
    "An early use for Spark has been Machine Learning. Spark's `MLlib` of algorithms contains classes for vectors and matrices, which are important for many ML algorithms. This exercise uses a simpler representation of matrices to explore another topic; explicit parallelism.\n",
    "\n",
    "The sample data is generated internally; there is no input that is read. The output is written to the file system as before.\n",
    "\n",
    "See the corresponding Spark job [Matrix4.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/Matrix4.scala).\n",
    "\n",
    "Let's start with a class to represent a Matrix."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined class Matrix\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "/**\n",
    " * A special-purpose matrix case class. Each cell is given the value\n",
    " * i*N + j for indices (i,j), counting from 0.\n",
    " * Note: Must be serializable, which is automatic for case classes.\n",
    " */\n",
    "case class Matrix(m: Int, n: Int) {\n",
    "  assert(m > 0 && n > 0, \"m and n must be > 0\")\n",
    "\n",
    "  private def makeRow(start: Long): Array[Long] =\n",
    "    Array.iterate(start, n)(i => i+1)\n",
    "\n",
    "  private val repr: Array[Array[Long]] =\n",
    "    Array.iterate(makeRow(0), m)(rowi => makeRow(rowi(0) + n))\n",
    "\n",
    "  /** Return row i, <em>indexed from 0</em>. */\n",
    "  def apply(i: Int): Array[Long]  = repr(i)\n",
    "\n",
    "  /** Return the (i,j) element, <em>indexed from 0</em>. */\n",
    "  def apply(i: Int, j: Int): Long = repr(i)(j)\n",
    "\n",
    "  private val cellFormat = {\n",
    "    val maxEntryLength = (m*n - 1).toString.length\n",
    "    s\"%${maxEntryLength}d\"\n",
    "  }\n",
    "\n",
    "  private def rowString(rowI: Array[Long]) =\n",
    "    rowI map (cell => cellFormat.format(cell)) mkString \", \"\n",
    "\n",
    "  override def toString = repr map rowString mkString \"\\n\"\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Some variables:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "nRows = 5\n",
       "nCols = 10\n",
       "out = output/matrix4\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "output/matrix4"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val nRows = 5\n",
    "val nCols = 10\n",
    "val out = \"output/matrix4\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's create a matrix."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "matrix = \n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       " 0,  1,  2,  3,  4,  5,  6,  7,  8,  9\n",
       "10, 11, 12, 13, 14, 15, 16, 17, 18, 19\n",
       "20, 21, 22, 23, 24, 25, 26, 27, 28, 29\n",
       "30, 31, 32, 33, 34, 35, 36, 37, 38, 39\n",
       "40, 41, 42, 43, 44, 45, 46, 47, 48, 49"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val matrix = Matrix(nRows, nCols)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With a Scala data structure like this, we can use `SparkContext.parallelize` to convert it into an `RDD`. In this case, we'll actually create an `RDD` with a count of indices for the number of rows, `1 to nRows`. Then we'll map over that `RDD` and use it compute the average of each row's columns. Finally, we'll \"collect\" the results back to an `Array` in the driver."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "sums_avgs = Array((45,4), (145,14), (245,24), (345,34), (445,44))\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "[(45,4), (145,14), (245,24), (345,34), (445,44)]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val sums_avgs = sc.parallelize(1 to nRows).map { i =>\n",
    "  // Matrix indices count from 0.\n",
    "  val sum = matrix(i-1) reduce (_ + _)  // Recall that \"_ + _\" is the same as \"(i1, i2) => i1 + i2\".\n",
    "  (sum, sum/nCols)                      // We'll return RDD[(sum, average)]\n",
    "}.collect                               // ... then convert to an array"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Recap\n",
    "\n",
    "`RDD.parallelize` is a convenient way to convert a data structure into an RDD."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exercises"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 1: Try different values of nRows and nCols"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 2: Try other statistics, like standard deviation\n",
    "\n",
    "The code for the standard deviation that you would add is the following:\n",
    "\n",
    "```scala\n",
    "val row = matrix(i-1)\n",
    "...\n",
    "val sumsquares = row.map(x => x*x).reduce(_+_)\n",
    "val stddev = math.sqrt(1.0*sumsquares) // 1.0* => so we get a Double for the sqrt!\n",
    "```\n",
    "\n",
    "Given the synthesized data in the matrix, are the average and standard deviation actually very meaningful here, if this were representative of real data?"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
